package base_spark

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.parsing.json.JSON

case class MediJson(seqNumber: Int, inn: String)

class ExDataFrame {
    def read_json(path: String): Unit = {

        val dfJson = ExDataFrame.spark.read.json(path)
        // 注册全局表
        dfJson.createGlobalTempView("temptable")

        // 选择
        dfJson.select("生产企业", "包装").head(10)
        // 聚合
        val dfGroup = dfJson.groupBy("生产企业", "包装").count
        // 排序
        dfGroup.sort(dfGroup("count").desc).show()
        //保存为csv文件
        //  dfJson.write.csv("dddd")
        // 暂时不知道什么原因导致不能用注册的表格查询数据
        //spark.sql("select * from temptable").show()
    }

    // rdd 发射转换
    def read_json_1(path: String): Unit = {
        //导入隐饰操作，否则RDD无法调用toDF方法
        import ExDataFrame.spark.implicits._
        ExDataFrame.sc.textFile(path)
            .map(line => {
                JSON.parseFull(line)
            })
            .map(item => {
                item match {
                    case Some(value: Map[String, Int]) => (value.get("序号").getOrElse(""),
                        value.get("实际通用名").getOrElse(""))
                }
            })
            .map(item => MediJson(item._1.toString.toInt, item._2.toString))
            .toDF()
            .show()
    }

    //StructType StructField 将已有的rdd进行转换
    def read_json_2(path: String): Unit = {
        val schema = StructType(
            Array(
                StructField(name = "seqNumber",dataType =IntegerType,true),
                StructField(name = "inn",dataType =StringType,true)
            ))

        val rdd =   ExDataFrame.sc.textFile(path)
            .map(line => {
                JSON.parseFull(line)
            })
            .map(item => {
                item match {
                    case Some(value: Map[String, Int]) => (value.get("序号").getOrElse(""),
                        value.get("实际通用名").getOrElse(""))
                }
            })
            .map(item=>Row(item._1.toString.toInt,item._2.toString))
        ExDataFrame.spark.createDataFrame(rdd,schema).groupBy("inn").mean("seqNumber").show()


    }
}

object ExDataFrame {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local[*]")
        .setAppName("dataFrame练习")
    val sc = new SparkContext(conf)
    val spark = SparkSession.builder().getOrCreate()


    def main(args: Array[String]): Unit = {
        //        new ExDataFrame().read_json(base_spark.util.Utils.getResourcePath("/json/1.json"))
        new ExDataFrame().read_json_2(base_spark.util.Utils.getResourcePath("/json/1.json"))

    }

}
